package kafka

import (
	"context"
	"errors"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

type MyKafka struct {
	Url string
	Topic string
	Partition int
	Conn *kafka.Conn
}

func NewKafkaWithConn(url string,topic string,partition int) (*MyKafka,error) {
	conn,err:=kafka.DialLeader(context.Background(),"tcp",url,topic,partition)
	if err!=nil{
		return nil, err
	}
	c:=&MyKafka{Conn: conn,Topic: topic,Partition: partition}
	return c,nil
}

func (k *MyKafka) Produce(msgs []string) error {
	if len(msgs)==0{
		err:=errors.New("empty messages")
		return err
	}
	var bmsgs []kafka.Message
	for _,msg:=range msgs{
		bmsgs = append(bmsgs, kafka.Message{Value: []byte(msg)})
	}
	k.Conn.SetWriteDeadline(time.Now().Add(10*time.Second))
	_,err:=k.Conn.WriteMessages(bmsgs...)
	if err!=nil{
		return err
	}
	if err:=k.Conn.Close();err!=nil{
		return err
	}
	return nil
}

func (k *MyKafka) Consume(do func([]byte)) error {
	k.Conn.SetWriteDeadline(time.Now().Add(10*time.Second))
	batch:=k.Conn.ReadBatch(10e3,1e6)
	b:=make([]byte,10e3)
	var ret []byte
	for{
		_,err:=batch.Read(b)
		if err!=nil{
			break
		}
		ret = append(ret, b...)
	}
	do(ret)
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
		return err
	}

	if err := k.Conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
		return err
	}
	return nil
}